Skip to content

[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready#27921

Open
Samrat002 wants to merge 3 commits intoapache:masterfrom
Samrat002:FLINK-36753
Open

[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready#27921
Samrat002 wants to merge 3 commits intoapache:masterfrom
Samrat002:FLINK-36753

Conversation

@Samrat002
Copy link
Copy Markdown
Contributor

What is the purpose of the change

FLIP-461 introduced checkpoint-synchronized rescaling where the Adaptive Scheduler waits for a checkpoint to complete before rescaling. However, it passively waits for the next periodic checkpoint, which can delay rescaling significantly when checkpoint intervals are large (e.g., 10 minutes).
This PR makes the Adaptive Scheduler actively trigger a checkpoint when resources change and rescaling is desired. The trigger fires at the right time. ie, when the DefaultStateTransitionManager enters the Stabilizing or Stabilized phase (i.e., when the resource gate is open and the scheduler is waiting for the checkpoint gate). The feature is controlled by a new configuration option jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled (default: false).

The feature respects execution.checkpointing.min-pause, skips if a checkpoint is already in progress, and only fires when parallelism has actually changed.

Brief change log

  • Added requestActiveCheckpointTrigger() to StateTransitionManager.Context interface
  • DefaultStateTransitionManager calls requestActiveCheckpointTrigger() when entering Stabilizing, on onChange during Stabilizing, and when entering Stabilized
  • Executing implements the callback with guard conditions (config enabled, checkpointing configured, parallelism changed, no checkpoint in progress)
  • Added config option jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled wired through AdaptiveScheduler.Settings
  • Added integration test proving rescale happens without periodic checkpoints or manual triggers

Verifying this change

This change added tests and can be verified as follows:

  • Added RescaleOnCheckpointITCase#testRescaleWithActiveCheckpointTrigger that starts a job with checkpointing interval of 1 hour, maxTriggerDelay set to infinity, and no manual triggerCheckpoint() call.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 12, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Samrat002 Samrat002 marked this pull request as ready for review April 13, 2026 17:20
@Samrat002
Copy link
Copy Markdown
Contributor Author

@1996fanrui PTAL whenever time.

Copy link
Copy Markdown
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. I've left a couple of comments, however I don't have context to review whether this is properly integrated with AdatpiveScheduler and DefaultStateTransitionManager. Would be great for someone else to take a look as well.

public static final ConfigOption<Boolean> SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED =
key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled")
.booleanType()
.defaultValue(false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a downside of using this option? If we expect this to be generally positive change, and you disable it by default only as a pre-caution/for backward compatibility, I would be actually fine setting it by default to true.

Copy link
Copy Markdown
Contributor Author

@Samrat002 Samrat002 Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, I chose a defensive approach. There is no compelling reason to keep it false.
Updated default value to true

Comment on lines +223 to +228
waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM);
final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM;
LOG.info(
"Waiting for {} slot(s) to become available after scale down.",
expectedFreeSlotCount);
waitForAvailableSlots(restClusterClient, expectedFreeSlotCount);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your test, it would still pass after 1h, after the regular periodic checkpoint is triggered after 1h, even with your new option disabled, right?

I think you should make sure that the timeout in waitForRunningTasks waitForAvailableSlots (or CI timeout 4h) is longer than env.enableCheckpointing(Duration.ofHours(1).toMillis());. So either, decrease the timeout in waiting to < 30 minutes, or increase checkpointing interval to 24h (CI will be killed after 4h AFAIR).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated checkpointing to 24 hours. also added assertions for min-pause

Copy link
Copy Markdown
Contributor

@ztison ztison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks for the PR. We look at it with @XComp and found few things to improve.

+ "rather than waiting for the next periodic checkpoint. "
+ "This reduces rescaling latency, especially when checkpoint intervals are large. "
+ "The active trigger respects %s and will not trigger if a checkpoint is already in progress.",
text("execution.checkpointing.min-pause"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might probably be:
code(CheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.key()))

resourceStabilizationTimeout,
firstChangeEventTimestamp,
maxTriggerDelay));
transitionContext.requestActiveCheckpointTrigger();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this call is needed here? ISn't it enough to call it in org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilizing#onChange ?


private void progressToStabilized(Temporal firstChangeEventTimestamp) {
progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, maxTriggerDelay));
transitionContext.requestActiveCheckpointTrigger();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it looks the method is called on many places. Wondering if we could control when it is called only in Phases. So moving this to Stabilized phase?

"When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, "
+ "rather than waiting for the next periodic checkpoint. "
+ "This reduces rescaling latency, especially when checkpoint intervals are large. "
+ "The active trigger respects %s and will not trigger if a checkpoint is already in progress.",
Copy link
Copy Markdown
Contributor

@ztison ztison Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really respect the min-pause? What I see it only respects that it doesn't trigger a new checkpoint when another is in a progress.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh check for minpause was missing . PTAL


/**
* Requests the context to actively trigger a checkpoint to expedite rescaling. Called when
* the {@link DefaultStateTransitionManager} enters a phase that is ready to accept {@link
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it true? I see that the method is called on more places: entering Stabilizing, entering Stabilized, and on each onChange event while in Stabilizing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have fixed the problem now. PTAL at the revised version

* <li>No checkpoint must be currently in progress or being triggered
* </ul>
*/
private void triggerCheckpointForRescale() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to cover all possible paths by tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests PTAL

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Apr 15, 2026
@Samrat002 Samrat002 requested review from pnowojski and ztison April 16, 2026 18:53
@Samrat002
Copy link
Copy Markdown
Contributor Author

@flinkbot run azure

@Samrat002
Copy link
Copy Markdown
Contributor Author

@ztison @pnowojski PTAL . i have addressed to review comments

added Unit tests , made the IT more robust and ensured minpause is respected

@ztison
Copy link
Copy Markdown
Contributor

ztison commented Apr 20, 2026

@ztison @pnowojski PTAL . i have addressed to review comments

added Unit tests , made the IT more robust and ensured minpause is respected

Thanks for incorporating our improvements. I was on a vacation the last few days so I haven't responded. I am back, I will check the PR today or tomorrow.

Copy link
Copy Markdown
Contributor

@ztison ztison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see some issues with retry logic.

* satisfy the configured {@code minPauseBetweenCheckpoints}. This can be used by callers that
* trigger non-periodic checkpoints but still wish to respect the min-pause constraint.
*/
public boolean isMinPauseBetweenCheckpointsSatisfied() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnowojski Is this safe without lock?

return;
}

if (!checkpointCoordinator.isMinPauseBetweenCheckpointsSatisfied()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you return a remaining time to the next checkpoint instead of boolean, then you can directly use it in following call of context.runIfState( this, this::requestActiveCheckpointTrigger, remainingTimeToSatisfyMinPause) and you can get rid of hardcoded ACTIVE_CHECKPOINT_RETRY_DELAY .

.warn(
"Active checkpoint trigger for rescale failed, scheduling retry.",
throwable);
context.runIfState(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to introduce this endless cycle? I feel we should just log it and give it up. If the checkpoint is failing there probably is different issue with job and we shouldn't try it again and again without any retry cap.

getLogger()
.debug(
"Skipping active checkpoint trigger for rescale: min pause between checkpoints not satisfied, scheduling retry.");
context.runIfState(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get e.g. 10 onChange events then we will schedule this method 10 times. We should have some kind of deduplication.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants